home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Personal Computer World 2009 February
/
PCWFEB09.iso
/
Software
/
Resources
/
Chat & Communication
/
Digsby build 37
/
digsby_setup.exe
/
lib
/
mail
/
smtp.pyo
(
.txt
)
< prev
next >
Wrap
Python Compiled Bytecode
|
2008-10-13
|
12KB
|
370 lines
# Source Generated with Decompyle++
# File: in.pyo (Python 2.5)
import traceback
from common import profile
from util import unpack_pstr, pack_pstr
from common.emailaccount import EmailAccount
from AccountManager import NETWORK_FLAG
from prefs import localprefprop
SMTP_UPGRADING = True
import smtplib
import re
MAXHEADERLEN = 76
class SMTPsender:
_ignore_domains = []
addrfmt = '[\\w\\d_\\.\\-\\+=]+\\@(?:(?:[\\w\\d\\-])+\\.)+(?:[\\w\\d]{2,4})'
shortaddr_re = re.compile('%s$' % addrfmt)
longaddr_re = re.compile('^\\s*(.*)\\s+<(%s)>\\s*$' % addrfmt)
def __init__(self, name, password, server, port = 25, use_tls = False, from_name = None, reply_to = None):
self.user_name = name
self.password = password
self.smtp_server = server
self.smtp_port = port
self.from_name = None if from_name is not None else name
self.replyto_email = None if reply_to is not None else name
self._use_tls = use_tls
self._init_pref_encoding()
def format_header(self, key, name, email = None):
Header = Header
import email.Header
maxlength = MAXHEADERLEN - len(key) + 2
if maxlength < 10:
raise AssertionError, 'Header length is too short'
try:
tmp = None if isinstance(name, unicode) else name
header = Header(tmp, 'utf-8', maxlinelen = maxlength)
except UnicodeEncodeError:
header = Header(name, self._charset, maxlinelen = maxlength)
if not email:
return header
else:
return '"%s" <%s>' % (header, email)
def add_headers(self, msg, headers):
for h in headers:
msg[h] = self.encode_header(h, headers[h])
def get_smtp_address(self, address):
if not address:
return None
def is_email(address):
pos = address.find('@')
if pos == -1:
return False
if address[pos + 1:].lower() in self._ignore_domains:
return False
return True
if not is_email(address):
if address == 'anonymous':
return None
if self.email_map.has_key(address):
address = self.email_map[address]
elif SMTPsender.nodomaddr_re.match(address):
if self.config.getbool('notification', 'use_short_addr'):
return address
domain = self.config.get('notification', 'smtp_default_domain')
if domain:
address = '%s@%s' % (address, domain)
else:
self.env.log.info('Email address w/o domain: %s' % address)
return None
mo = self.shortaddr_re.search(address)
if mo:
return mo.group(0)
mo = self.longaddr_re.search(address)
if mo:
return mo.group(2)
self.env.log.info('Invalid email address: %s' % address)
def _init_pref_encoding(self):
Charset = Charset
QP = QP
BASE64 = BASE64
import email.Charset
self._charset = Charset()
self._charset.input_charset = 'utf-8'
pref = 'base64'
if pref == 'base64':
self._charset.header_encoding = BASE64
self._charset.body_encoding = BASE64
self._charset.output_charset = 'utf-8'
self._charset.input_codec = 'utf-8'
self._charset.output_codec = 'utf-8'
elif pref in ('qp', 'quoted-printable'):
self._charset.header_encoding = QP
self._charset.body_encoding = QP
self._charset.output_charset = 'utf-8'
self._charset.input_codec = 'utf-8'
self._charset.output_codec = 'utf-8'
elif pref == 'none':
self._charset.header_encoding = None
self._charset.body_encoding = None
self._charset.input_codec = None
self._charset.output_charset = 'ascii'
else:
raise AssertionError, 'Invalid email encoding setting: %s' % pref
def encode_header(self, key, value):
if isinstance(value, tuple):
return self.format_header(key, value[0], value[1])
if isinstance(value, list):
items = []
for v in value:
items.append(self.encode_header(v))
return ',\n\t'.join(items)
mo = self.longaddr_re.match(value)
if mo:
return self.format_header(key, mo.group(1), mo.group(2))
return self.format_header(key, value)
def begin_send(self):
self.server = smtplib.SMTP(self.smtp_server, self.smtp_port)
if self._use_tls:
self.server.ehlo()
if not self.server.esmtp_features.has_key('starttls'):
raise AssertionError, 'TLS enabled but server does not support TLS'
self.server.starttls()
self.server.ehlo()
if self.user_name:
try:
self.server.login(self.user_name, self.password)
def send_email(self, to = '', subject = '', body = '', cc = '', bcc = ''):
self.begin_send()
self.send([
to], [], subject, body)
try:
self.finish_send()
except:
pass
def send(self, torcpts, ccrcpts, subject, body, mime_headers = { }):
MIMEText = MIMEText
import email.MIMEText
headers = { }
headers['Subject'] = subject
headers['From'] = (self.from_name, self.from_name)
def build_addresses(rcpts):
return []([], [ self.get_smtp_address(addr) for addr in rcpts ])
def remove_dup(rcpts, all):
tmp = []
for rcpt in rcpts:
if rcpt not in all:
tmp.append(rcpt)
all.append(rcpt)
continue
return (tmp, all)
toaddrs = build_addresses(torcpts)
recipients = []
(toaddrs, recipients) = remove_dup(toaddrs, recipients)
if len(recipients) < 1:
return None
headers['To'] = ', '.join(toaddrs)
msg = MIMEText(body.encode('utf-8'), 'plain')
del msg['Content-Transfer-Encoding']
msg.set_charset(self._charset)
self.add_headers(msg, headers)
self.add_headers(msg, mime_headers)
msgtext = msg.as_string()
recrlf = re.compile('\r?\n')
msgtext = '\r\n'.join(recrlf.split(msgtext))
self.server.sendmail(msg['From'], recipients, msgtext)
def finish_send(self):
if self._use_tls:
import socket as socket
try:
self.server.quit()
except socket.sslerror:
pass
except:
None<EXCEPTION MATCH>socket.sslerror
None<EXCEPTION MATCH>socket.sslerror
self.server.quit()
from common.emailaccount import EmailAccount
from util import threaded
class SMTPEmailAccount(EmailAccount):
DEFAULT_SMTP_REQUIRE_SSL = False
DEFAULT_SMTP_PORT = 25
def __init__(self, **options):
d = self.default
self.smtp_server = options.pop('smtp_server', d('smtp_server'))
self.smtp_require_ssl = options.pop('smtp_require_ssl', d('smtp_require_ssl'))
self.smtp_port = options.pop('smtp_port', d('smtp_port'))
self.smtp_username = options.pop('smtp_username', d('smtp_username'))
self._email_address = options.pop('email_address', d('email_address'))
self._encrypted_smtppw = profile.crypt_pw(options.pop('smtp_password', d('smtp_password')))
self._encrypted_pw = options.pop('password')
EmailAccount.__init__(self, password = self.password, **options)
def get_email_address(self):
return self._email_address
def set_email_address(self, val):
self._email_address = val
def from_name(self):
return self.email_address
from_name = property(from_name)
def _unglue_pw(cls, password):
passwordstr = profile.plain_pw(password).encode('utf-8')
(password, r) = unpack_pstr(passwordstr)
try:
(smtppassword, r) = unpack_pstr(r)
except:
(smtppassword, r) = (u'', u'')
return (profile.crypt_pw(password.decode('utf-8')), profile.crypt_pw(smtppassword.decode('utf-8')))
_unglue_pw = classmethod(_unglue_pw)
def _set_password(self, password):
try:
(self._encrypted_pw, self._encrypted_smtppw) = self._unglue_pw(password)
except Exception:
e = None
traceback.print_exc()
self._encrypted_pw = password
self._encrypted_smtppw = ''
def _glue_pw(cls, encrypted_pw, encrypted_smtppw):
password = pack_pstr(profile.plain_pw(encrypted_pw).encode('utf-8')).decode('utf-8')
smtppw = pack_pstr(profile.plain_pw(encrypted_smtppw).encode('utf-8')).decode('utf-8')
return profile.crypt_pw(password + smtppw)
_glue_pw = classmethod(_glue_pw)
def _get_password(self):
return self._glue_pw(self._encrypted_pw, self._encrypted_smtppw)
password = property(_get_password, _set_password)
def _decryptedpw(self):
return profile.plain_pw(self._encrypted_pw)
def _decrypted_smtppw(self):
return profile.plain_pw(self._encrypted_smtppw)
smtp_password = property(_decrypted_smtppw)
def update_info(self, **info):
if not self.isflagged(NETWORK_FLAG):
if 'password' in info:
self._encrypted_pw = info.pop('password')
if 'smtp_password' in info:
self._encrypted_smtppw = profile.crypt_pw(info.pop('smtp_password'))
else:
self.password = info.pop('password')
EmailAccount.update_info(self, **info)
def _get_options(self):
opts = EmailAccount._get_options(self)
opts.update((dict,)((lambda .0: for a in .0:
(a, getattr(self, a)))('smtp_server smtp_port smtp_require_ssl smtp_username email_address'.split())))
return opts
def from_net(cls, info):
password = info.password
try:
(encrypted_pw, encrypted_smtppw) = cls._unglue_pw(password)
except Exception:
e = None
traceback.print_exc()
encrypted_pw = password
encrypted_smtppw = u''
info.password = encrypted_pw
smtppw = profile.plain_pw(encrypted_smtppw)
return EmailAccount.from_net(info, smtp_password = smtppw)
from_net = classmethod(from_net)
def send_email(self, to = '', subject = '', body = '', cc = '', bcc = ''):
if not self.smtp_username:
pass
un = self.username
f = self.from_name
if not self._decrypted_smtppw():
pass
password = None if not self.smtp_username else ''
srv = self.smtp_server
if srv in ('smtp.aol.com', 'smtp.aim.com'):
if un.endswith('aol.com') or un.endswith('aim.com'):
f = un
else:
f = un + u'@aol.com'
s = SMTPsender(un, password, srv, from_name = f, use_tls = self.smtp_require_ssl)
s.send_email(to = to, subject = subject, body = body, cc = cc, bcc = bcc)
send_email = threaded(send_email)
mailclient = localprefprop(EmailAccount.mailclient_localprefs_key, 'sysdefault')